package kafka_exporter

import (
	"flag"
	"strings"

	"github.com/Shopify/sarama"
	"github.com/golang/glog"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/common/version"
)

func NewOpt(uri, uriZookeeper []string, labels, kafkaVersion string, defautlTopicWorkers int) *kafkaOpts {
	if len(kafkaVersion) == 0 {
		kafkaVersion = sarama.V1_0_0_0.String()
	}
	return &kafkaOpts{
		uri:                      uri,
		useSASL:                  false,
		useSASLHandshake:         true,
		saslUsername:             "",
		saslPassword:             "",
		saslMechanism:            "",
		useTLS:                   false,
		tlsCAFile:                "",
		tlsCertFile:              "",
		tlsKeyFile:               "",
		tlsInsecureSkipTLSVerify: false,
		kafkaVersion:             kafkaVersion,
		useZooKeeperLag:          false,
		uriZookeeper:             uriZookeeper,
		topicWorkers:             defautlTopicWorkers,
		labels:                   labels,
		offsetShowAll:            true,
		metadataRefreshInterval:  "30s",
	}
}

type ownDesc struct {
	clusterBrokers                     *prometheus.Desc
	topicPartitions                    *prometheus.Desc
	topicCurrentOffset                 *prometheus.Desc
	topicOldestOffset                  *prometheus.Desc
	topicPartitionLeader               *prometheus.Desc
	topicPartitionReplicas             *prometheus.Desc
	topicPartitionInSyncReplicas       *prometheus.Desc
	topicPartitionUsesPreferredReplica *prometheus.Desc
	topicUnderReplicatedPartition      *prometheus.Desc
	consumergroupCurrentOffset         *prometheus.Desc
	consumergroupCurrentOffsetSum      *prometheus.Desc
	consumergroupLag                   *prometheus.Desc
	consumergroupLagSum                *prometheus.Desc
	consumergroupLagZookeeper          *prometheus.Desc
	consumergroupMembers               *prometheus.Desc
}

type KafKaCollecter struct {
	Exporter Exporter
	Client   sarama.Client
	Registry *prometheus.Registry
	//Options  *kafkaOpts
	//Desc     ownDesc
	//mux      *http.ServeMux
}

func NewKafkaCollecter(target, kafkaVersion, zookeeper, label, topicFilter, groupFilter string, defautlTopicWorkers int) (*KafKaCollecter, error) {
	defer func() {
		if err := recover(); err != nil {
			glog.Error("new kafka collecter err: ", err)
		}
	}()
	uri := strings.Split(target, ",")
	uriZookeeper := strings.Split(zookeeper, ",")

	labels := make(map[string]string)

	if label != "" {
		for _, label := range strings.Split(label, ",") {
			splitted := strings.Split(label, "=")
			if len(splitted) >= 2 {
				labels[splitted[0]] = splitted[1]
			}
		}
	}

	opts := NewOpt(uri, uriZookeeper, label, kafkaVersion, defautlTopicWorkers)
	exporter, err := ownSetup(topicFilter, groupFilter, *opts, labels)
	if err != nil {
		glog.Error("collect setup err:", err)
		return nil, err
	}
	glog.Info("exporter ok:", uri)
	//defer exporter.client.Close()
	register := prometheus.NewRegistry()
	register.MustRegister(exporter)
	return &KafKaCollecter{
		Exporter: *exporter,
		Client:   exporter.client,
		//Desc:     desc,
		Registry: register,
	}, nil
}

func ownSetup(
	topicFilter string,
	groupFilter string,
	opts kafkaOpts,
	labels map[string]string,
) (*Exporter, error) {
	if err := flag.Set("logtostderr", "true"); err != nil {
		glog.Errorf("Error on setting logtostderr to true")
	}

	defer glog.Flush()

	glog.V(INFO).Infoln("Starting kafka_exporter", version.Info())
	glog.V(DEBUG).Infoln("Build context", version.BuildContext())
	//desc := ownDesc{}
	clusterBrokers = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "", "brokers"),
		"Number of Brokers in the Kafka Cluster.",
		nil, labels,
	)
	topicPartitions = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "topic", "partitions"),
		"Number of partitions for this Topic",
		[]string{"topic"}, labels,
	)
	topicCurrentOffset = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "topic", "partition_current_offset"),
		"Current Offset of a Broker at Topic/Partition",
		[]string{"topic", "partition"}, labels,
	)
	topicOldestOffset = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "topic", "partition_oldest_offset"),
		"Oldest Offset of a Broker at Topic/Partition",
		[]string{"topic", "partition"}, labels,
	)

	topicPartitionLeader = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "topic", "partition_leader"),
		"Leader Broker ID of this Topic/Partition",
		[]string{"topic", "partition"}, labels,
	)

	topicPartitionReplicas = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "topic", "partition_replicas"),
		"Number of Replicas for this Topic/Partition",
		[]string{"topic", "partition"}, labels,
	)

	topicPartitionInSyncReplicas = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"),
		"Number of In-Sync Replicas for this Topic/Partition",
		[]string{"topic", "partition"}, labels,
	)

	topicPartitionUsesPreferredReplica = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "topic", "partition_leader_is_preferred"),
		"1 if Topic/Partition is using the Preferred Broker",
		[]string{"topic", "partition"}, labels,
	)

	topicUnderReplicatedPartition = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "topic", "partition_under_replicated_partition"),
		"1 if Topic/Partition is under Replicated",
		[]string{"topic", "partition"}, labels,
	)

	consumergroupCurrentOffset = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "consumergroup", "current_offset"),
		"Current Offset of a ConsumerGroup at Topic/Partition",
		[]string{"consumergroup", "topic", "partition"}, labels,
	)

	consumergroupCurrentOffsetSum = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "consumergroup", "current_offset_sum"),
		"Current Offset of a ConsumerGroup at Topic for all partitions",
		[]string{"consumergroup", "topic"}, labels,
	)

	consumergroupLag = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "consumergroup", "lag"),
		"Current Approximate Lag of a ConsumerGroup at Topic/Partition",
		[]string{"consumergroup", "topic", "partition"}, labels,
	)

	consumergroupLagZookeeper = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "consumergroupzookeeper", "lag_zookeeper"),
		"Current Approximate Lag(zookeeper) of a ConsumerGroup at Topic/Partition",
		[]string{"consumergroup", "topic", "partition"}, nil,
	)

	consumergroupLagSum = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "consumergroup", "lag_sum"),
		"Current Approximate Lag of a ConsumerGroup at Topic for all partitions",
		[]string{"consumergroup", "topic"}, labels,
	)

	consumergroupMembers = prometheus.NewDesc(
		prometheus.BuildFQName(namespace, "consumergroup", "members"),
		"Amount of members in a consumer group",
		[]string{"consumergroup"}, labels,
	)
	return NewExporter(opts, topicFilter, groupFilter)

}
